/*******************************************************************************
 * Package: com.song.bigdata.stream
 * Type:    WorkDataStream
 * Date:    2022-10-23 15:42
 *
 * Copyright (c) 2022 HUANENG GUICHENG TRUST CORP.,LTD All Rights Reserved.
 *
 * You may not use this file except in compliance with the License.
 *******************************************************************************/
package com.song.bigdata.stream;

import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

import java.util.concurrent.TimeUnit;

/**
 * 功能描述：
 * TODO 报错1： org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
 * 解决方法：https://blog.csdn.net/liaomingwu/article/details/123051704
 * @author Songxianyang
 * @date 2022-10-23 15:42
 */

public class WorkDataStream {
    public static void main(String[] args) throws Exception {
        // 创建环境
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();

        environment.setRestartStrategy(RestartStrategies.fixedDelayRestart(
                3, // 尝试重启的次数
                Time.of(10, TimeUnit.SECONDS) // 间隔
        ));
        // 获取 nc得实时数据

        DataStreamSource<String> dataStreamSource = environment.socketTextStream("localhost", 9999);

        // 解析实时数据 二元组
        /*
        song 1
        xian 1
        song 2
         */

        SingleOutputStreamOperator<Tuple2<String, Long>> returns = dataStreamSource.flatMap((String link, Collector<Tuple2<String, Long>> out) -> {
            String[] names = link.split(" ");
            for (String name : names) {
                out.collect(Tuple2.of(name, 1L));
            }
        }).returns(Types.TUPLE(Types.STRING, Types.LONG));
        // 分组并统计
        SingleOutputStreamOperator<Tuple2<String, Long>> sum = returns.keyBy(data -> (data.f0)).sum(1);
        sum.print();
        environment.execute();
    }
}
